package com.gbase8c.dmt.migration;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.gbase8c.dmt.common.util.JsonMapper;
import com.gbase8c.dmt.config.DmtConfig;
import com.gbase8c.dmt.dao.DataSourceDao;
import com.gbase8c.dmt.dao.SnapshotDao;
import com.gbase8c.dmt.db.metadata.Metadata;
import com.gbase8c.dmt.db.metadata.MetadataFactory;
import com.gbase8c.dmt.model.enums.MigrationObjectType;
import com.gbase8c.dmt.model.enums.DbType;
import com.gbase8c.dmt.model.enums.Status;
import com.gbase8c.dmt.model.migration.config.MigrateConfig;
import com.gbase8c.dmt.model.migration.config.Snapshot;
import com.gbase8c.dmt.model.migration.config.Task;
import com.gbase8c.dmt.model.migration.dto.DataSourceDto;
import com.gbase8c.dmt.model.migration.dto.DboDto;
import com.gbase8c.dmt.model.migration.dto.MigrationObject;
import com.gbase8c.dmt.model.migration.dto.TableDto;
import com.gbase8c.dmt.model.migration.job.DataxJob;
import com.gbase8c.dmt.model.enums.MigrationTaskStatus;
import com.gbase8c.dmt.model.migration.record.Mo;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;

import java.io.File;
import java.sql.Types;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

@Slf4j
public class MigrationObjectService {

    private static ConcurrentHashMap<String, MigrationTaskStatus> taskStatus = new ConcurrentHashMap();

    private DataSourceDao dataSourceDao;
    private DmtConfig dmtConfig;
    private SnapshotDao snapshotDao;

    public MigrationObjectService(DmtConfig dmtConfig,
                                  DataSourceDao dataSourceDao,
                                  SnapshotDao snapshotDao) {
        this.dmtConfig = dmtConfig;
        this.dataSourceDao = dataSourceDao;
        this.snapshotDao = snapshotDao;
    }

    public static void updateTaskStatus(String taskId, MigrationTaskStatus status) {
        taskStatus.put(taskId, status);
    }

    public static MigrationTaskStatus getTaskStatus(String taskId) {
        return taskStatus.get(taskId);
    }

    public static void removeTaskStatus(String taskId) {
        taskStatus.remove(taskId);
    }

    private List<MigrationObject> migrationObjects(List<? extends MigrationObject> mos) {
        if (CollectionUtils.isNotEmpty(mos)) {
            return mos.stream()
                    .map(mo -> {
                        MigrationObject mow = new MigrationObject();
                        mow.setMigrationObjectType(mo.getMigrationObjectType());
                        mow.setSchema(mo.getSchema());
                        mow.setTarSchema(mo.getTarSchema());
                        mow.setName(mo.getName());
                        mow.setTarName(mo.getTarName());
                        mow.setTableMapper(mo.getTableMapper());
                        mow.setContents(mo.getContents());
                        mow.setStatus(mo.getStatus());
                        mow.setInitializable(mo.getInitializable());
                        mow.setConvertible(mo.getConvertible());
                        mow.setInitMsg(mo.getInitMsg());
                        mow.setConvertMsg(mo.getConvertMsg());
                        mow.setNoteMsg(mo.getNoteMsg());
                        mow.setMsg(mo.getMsg());
                        return mow;
                    })
                    .collect(Collectors.toList());
        } else {
            return Lists.newArrayList();
        }
    }

    public Map<MigrationObjectType, List<? extends MigrationObject>> getMigrationObjects(DboDto dboDto) {
        Map<MigrationObjectType, List<? extends MigrationObject>> orderedDbObjects = Maps.newLinkedHashMap();
        Map<MigrationObjectType, List<? extends MigrationObject>> dbObjects = dboDto.dbObjects();

        Task task = dboDto.getTask();
        MigrateConfig migrateConfig = task.getMigrateConfig();

        // data
        if (migrateConfig.getMigrateData()) {
            List<TableDto> tableDtos = dboDto.getSchemaDtos().stream()
                    .flatMap(schemaDto -> schemaDto.getTableDtos().stream())
                    .collect(Collectors.toList());
            List<DataxJob> dataxJobs = buildDataxJobs(task, tableDtos, task.getMigrateConfig().getDmSetting());
            dbObjects.put(MigrationObjectType.DATA, dataxJobs);
        }

        List<MigrationObjectType> orders = dmtConfig.getOrder();
        for (MigrationObjectType migrationObjectType : orders) {
            if (CollectionUtils.isNotEmpty(dbObjects.get(migrationObjectType))) {
                orderedDbObjects.put(migrationObjectType, migrationObjects(dbObjects.get(migrationObjectType)));
            }
        }

        for (Map.Entry<MigrationObjectType, List<? extends MigrationObject>> entry : orderedDbObjects.entrySet()) {
            MigrationObjectType migrationObjectType = entry.getKey();
            List<? extends MigrationObject> migrationObjects = entry.getValue();
            List<Mo> mos = migrationObjects.stream()
                    .map(migrationObject -> Mo.builder()
                            .type(migrationObjectType.getName())
                            .name((MigrationObjectType.TABLESPACE.equals(migrationObjectType)
                                    || MigrationObjectType.SCHEMA.equals(migrationObjectType))
                                    ? migrationObject.getName() : migrationObject.getSchema() + "." + migrationObject.getName())
                            .status(Status.UnStart.name())
                            .build())
                    .collect(Collectors.toList());
            persistent(dmtConfig, migrationObjectType, mos);
        }

        return orderedDbObjects;
    }

    public static void persistent(DmtConfig dmtConfig, MigrationObjectType mot, List<Mo> mos) {
        String dbObjectFile = StringUtils.joinWith(File.separator, dmtConfig.getWorkDir(), dmtConfig.getTaskId(), "status", mot.getName() + ".json");
        JsonMapper.nonEmptyMapper().toFile(new File(dbObjectFile), mos);
    }

    public static long toLong(String str) {
        long h = 0;
        if (str != null) {
            char[] val = str.toCharArray();

            for (int i = 0; i < str.length(); i++) {
                h = 31 * h + val[i];
            }
        }
        return h;
    }

    private List<DataxJob> buildDataxJobs(Task task,
                                          List<TableDto> tableDtos,
                                          MigrateConfig.DmSetting dmSetting) {

        List<DataxJob> dataxJobs = Lists.newArrayList();
        DataSourceDto src = dataSourceDao.get(task.getSrc());
        DataSourceDto tar = dataSourceDao.get(task.getTar());
        for (TableDto tableDto : tableDtos) {
            Map<String, Object> dataxMap = Maps.newHashMap();
            String jobName = tableDto.getSchema() + "." + tableDto.getName();
            Long jobId = toLong(jobName);
            dataxMap.put("job", buildJob(task, src, tar, tableDto, dmSetting));
            dataxJobs.add(DataxJob.builder()
                    .taskId(task.getId())
                    .tableJobName(jobName)
                    .schema(tableDto.getSchema())
                    .name(tableDto.getName())
                    .migrationObjectType(MigrationObjectType.DATA)
                    .jobId(jobId)
                    .src(src)
                    .tar(tar)
                    .contents(Lists.newArrayList(JSON.toJSONString(dataxMap)))
                    .json(JSON.toJSONString(dataxMap))
                    .build());
        }
//        dataxJobs.forEach(dataxJob -> log.info(dataxJob.getJson()));
        return dataxJobs;
    }

    private Map<String, Object> buildJob(Task task,
                                         DataSourceDto src,
                                         DataSourceDto tar,
                                         TableDto tableDto,
                                         MigrateConfig.DmSetting dmSetting) {
        Map<String, Object> job = Maps.newHashMap();
        job.put("workdir", dmtConfig.getWorkDir());
        job.put("_id", task.getId());
        job.put("name", tableDto.getSchema() + "." + tableDto.getName());
        job.put("setting", buildSetting(dmSetting));

        Map<String, Object> contentMap = Maps.newHashMap();
        contentMap.put("reader", buildReader(task, src, tableDto, dmSetting));
        contentMap.put("writer", buildWriter(tar, tableDto, dmSetting));
        job.put("content", Lists.newArrayList(contentMap));

        return job;
    }

    private static boolean isPKTypeValid(TableDto.ColumnDto column, DataSourceDto src) {
        boolean ret = false;

        if (column.isPk()) {
            if (isLongType(column, src) || isStringType(column, src)) {
                ret = true;
            }
        } else {
            ret = isLongType(column, src);
        }

        return ret;
    }

    private static boolean isLongType(TableDto.ColumnDto column, DataSourceDto src) {
        int type = column.getSqlType();
        boolean isValidLongType = type == Types.BIGINT || type == Types.INTEGER
                || type == Types.SMALLINT || type == Types.TINYINT;
        isValidLongType |= (type == Types.NUMERIC && (column.getDataScale() == null || column.getDataScale() == 0));
        return isValidLongType;
    }

    private static boolean isStringType(TableDto.ColumnDto column, DataSourceDto src) {
        int type = column.getSqlType();
        return type == Types.CHAR || type == Types.NCHAR
                || type == Types.VARCHAR || type == Types.LONGVARCHAR
                || type == Types.NVARCHAR;
    }

    private TableDto.ColumnDto columnDto(TableDto tableDto, String name) {
        List<TableDto.ColumnDto> columnDtos = tableDto.getColumnDtos().stream()
                .filter(columnDto -> columnDto.getName().equals(name))
                .collect(Collectors.toList());
        if (CollectionUtils.isNotEmpty(columnDtos)) {
            return columnDtos.get(0);
        } else {
            return null;
        }
    }

    private List<TableDto.ColumnDto> candidateColumns(TableDto tableDto) {
        List<TableDto.ColumnDto> candidateColumns = Lists.newArrayList();

        List<TableDto.ColumnDto> pkColumns = tableDto.getColumnDtos().stream()
                .filter(TableDto.ColumnDto::isPk)
                .collect(Collectors.toList());
        if (CollectionUtils.isNotEmpty(pkColumns)) {
            candidateColumns.addAll(pkColumns);
        }

        Map<String, Object> properties = tableDto.getProperties();
        String partitionColumn = (String) properties.get("PARTITIONCOLUMN");
        if (StringUtils.isNotBlank(partitionColumn)) {
            for (String columnName : StringUtils.split(partitionColumn, ",")) {
                TableDto.ColumnDto columnDto = columnDto(tableDto, columnName);
                if (ObjectUtils.isNotEmpty(columnDto)) {
                    candidateColumns.add(columnDto);
                }
            }
        }

        String distributeColumn = (String) properties.get("distributeColumn".toUpperCase());
        if (StringUtils.isNotBlank(distributeColumn)) {
            TableDto.ColumnDto distColumn = columnDto(tableDto, distributeColumn);
            if (ObjectUtils.isNotEmpty(distColumn)) {
                candidateColumns.add(distColumn);
            }
        }
        return candidateColumns;
    }

    private TableDto.ColumnDto splitPkColumn(DataSourceDto src, TableDto tableDto) {
        TableDto.ColumnDto splitPkColumn = null;
        List<TableDto.ColumnDto> candidateColumns = candidateColumns(tableDto);
        for (TableDto.ColumnDto columnDto : candidateColumns) {
            if (isPKTypeValid(columnDto, src)) {
                splitPkColumn = columnDto;
                break;
            }
        }
        return splitPkColumn;
    }

    private Map<String, Object> buildReader(Task task, DataSourceDto src, TableDto tableDto, MigrateConfig.DmSetting dmSetting) {
        Metadata metadata = MetadataFactory.getMetaData(src);
        Map<String, Object> reader = Maps.newHashMap();
        String srcName;
        boolean preserveCase = true;

        srcName = src.getDbType().name();
        String readerName = srcName.toLowerCase() + "reader";
        reader.put("name", readerName);

        HashMap<String, Object> paraMap = new HashMap<>();

        // todo 当前只对oracle做了处理, 并且不完善...
        TableDto.ColumnDto splitPkColumn = splitPkColumn(src, tableDto);
        if (ObjectUtils.isNotEmpty(splitPkColumn)) {
            paraMap.put("splitPk", metadata.wrap(splitPkColumn.getName(), true));
        }

        paraMap.put("username", src.getUsername());
        paraMap.put("password", src.getPassword());
        if (!src.getDbType().equals(DbType.MySQL)) {
            paraMap.put("fetchSize", 2048);
        }
        List<String> columnNames = tableDto.getColumnDtos().stream()
                .map(TableDto.ColumnDto::getName)
                .map(cn -> metadata.wrap(cn, preserveCase))
                .collect(Collectors.toList());
        paraMap.put("column", columnNames);

//        // todo
//        paraMap.put("scn", 121118198);
//        paraMap.put("samplePercentage", 50);
        paraMap.put("pageSize", 100000);

        // snapshot设置
        if (StringUtils.isNotBlank(task.getSnapshotId())) {
            Snapshot snapshot = snapshotDao.get(task.getSnapshotId());
            String info = snapshot.getInfo();
            if (StringUtils.isNotBlank(info)) {
                Map<String, Object> properties =  JsonMapper.nonEmptyMapper().fromJson(info, Map.class);
                // todo 根据数据库类型设置snapshot
                if (src.getDbType().equals(DbType.Oracle)) {
                    paraMap.put("scn", properties.get("SCN"));
                }
            }
        }

//        // todo 支持从partition读, 暂时关闭, 原因是数据断点续传如果是从partition读的话, 无法程序生成删除目标端的sql
//        if (src.getDbType().equals(DbType.Oracle)) {
//            String partitionSql = "SELECT partition_name as partitionName FROM ALL_TAB_PARTITIONS WHERE table_owner = ? AND table_name = ? order by partition_position";
//            List<String> partitions = metadata.query(partitionSql, new ColumnListHandler<String>(), tableDto.getSchema(), tableDto.getName());
//            if (CollectionUtils.isNotEmpty(partitions)) {
//                paraMap.put("partition", partitions);
//            }
//        }

        HashMap<String, Object> connMap = new HashMap<>();
        connMap.put("jdbcUrl", Lists.newArrayList(src.getJdbcUrl()));
        connMap.put("table", Lists.newArrayList(StringUtils.joinWith(".",
                metadata.wrap(tableDto.getSchema(), true),
                metadata.wrap(tableDto.getName(), true))));
        paraMap.put("connection", Lists.newArrayList(connMap));

        reader.put("parameter", paraMap);
        return reader;
    }

    private Map<String, Object> buildSetting(MigrateConfig.DmSetting dmSetting) {
        Map<String, Object> setting = Maps.newHashMap();

        Map<String, Object> speed = new HashMap<>();
        if (ObjectUtils.isNotEmpty(dmSetting.getChannel())) {
            speed.put("channel", dmSetting.getChannel());
        }
        if (ObjectUtils.isNotEmpty(dmSetting.getRecordLimit())) {
            speed.put("record", dmSetting.getRecordLimit());
        }
        if (ObjectUtils.isNotEmpty(dmSetting.getByteSpeed())) {
            speed.put("byte", dmSetting.getByteSpeed());
        }

        Map<String, Object> errorLimit = Maps.newHashMap();
        errorLimit.put("record", 0);
        errorLimit.put("percentage", 0.02);

        setting.put("speed", speed);
        setting.put("errorLimit", errorLimit);
        return setting;
    }

    private Map<String, Object> buildWriter(DataSourceDto tar, TableDto tableDto, MigrateConfig.DmSetting dmSetting) {
        Metadata metadata = MetadataFactory.getMetaData(tar);
        Map<String, Object> writer = new JSONObject();
        String srcName;

        srcName = tar.getDbType().name();
        String writerName = srcName.toLowerCase() + "writer";
        writer.put("name", writerName);

        HashMap<String, Object> paraMap = new HashMap<>();
        paraMap.put("username", tar.getUsername());
        paraMap.put("password", tar.getPassword());
        List<String> columnNames = tableDto.getColumnDtos().stream()
                .map(columnDto -> metadata.wrap(columnDto.getTarName(),true))
                .collect(Collectors.toList());
        paraMap.put("column", columnNames);

        HashMap<String, Object> connMap = new HashMap<>();
        connMap.put("jdbcUrl", tar.getJdbcUrl());
        connMap.put("table", Lists.newArrayList(StringUtils.joinWith(".",
                metadata.wrap(tableDto.getTarSchema(), true),
                metadata.wrap(tableDto.getTarName(), true))));
        paraMap.put("connection", Lists.newArrayList(connMap));
        if (tar.getDbType().equals(DbType.GBase8c)) {
            paraMap.put("session", Lists.newArrayList("set session_timeout to 0"));
        }
        if (ObjectUtils.isNotEmpty(dmSetting.getBatchSize())) {
            paraMap.put("batchSize", dmSetting.getBatchSize());
        }

        writer.put("parameter", paraMap);
        return writer;
    }
}
